/*
 * Cainiao.com Inc.
 * Copyright (c) 2013-2021 All Rights Reserved.
 */

package com.gitee.suveng.stream;

import java.net.URL;

import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.log.StaticLog;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 流处理 word count 程序
 *
 * @author suwenguang
 * @since 2021-01-21 20:35
 */
public class WordCountStream {
	public static void main(String[] args) throws Exception {
		// 流处理
		// 创建流处理执行环境
		StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
		// 设置并行度
		streamExecutionEnvironment.setParallelism(3);
		// 从文件读取数据
		String filePath = "data/data.txt";
		URL resource = ResourceUtil.getResource(filePath);
		DataStream<String> readTextFile = streamExecutionEnvironment.readTextFile(resource.getPath());

		// 数据流
		SingleOutputStreamOperator<Tuple2<String, Integer>> sum = readTextFile.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				// flatMap 函数式接口, value 输入, out 输出
				String[] wordArr = value.split(" ");
				StaticLog.info("value: {}", value);

				// 分词
				for (String word : wordArr) {
					Tuple2<String, Integer> tuple2 = new Tuple2<>(word, 1);
					out.collect(tuple2);
				}
			}
		}).keyBy(0).sum(1);

		sum.print();

		streamExecutionEnvironment.execute();
	}
}
